package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.class */
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
    @Test(timeout = 60000)
    public void testReceiverCanDrainMessages() throws Exception {
        sendMessages(getTestName(), 20);
        AmqpConnection connect = createAmqpClient().connect();
        AmqpReceiver createReceiver = connect.createSession().createReceiver(getTestName());
        Queue proxyToQueue = getProxyToQueue(getTestName());
        assertEquals(20, proxyToQueue.getMessageCount());
        createReceiver.drain(20);
        for (int i = 0; i < 20; i++) {
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull(receive);
            receive.accept();
        }
        createReceiver.close();
        assertEquals(0L, proxyToQueue.getMessageCount());
        connect.close();
    }

    @Test(timeout = 60000)
    public void testPullWithNoMessageGetDrained() throws Exception {
        AmqpConnection connect = createAmqpClient().connect();
        AmqpReceiver createReceiver = connect.createSession().createReceiver(getTestName());
        createReceiver.flow(10);
        assertEquals(0L, getProxyToQueue(getTestName()).getMessageCount());
        assertEquals(0L, r0.getDeliveringCount());
        assertEquals(10L, createReceiver.getReceiver().getRemoteCredit());
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        connect.close();
    }

    @Test(timeout = 60000)
    public void testPullOneFromRemote() throws Exception {
        sendMessages(getTestName(), 20);
        AmqpConnection connect = createAmqpClient().connect();
        AmqpReceiver createReceiver = connect.createSession().createReceiver(getTestName());
        Queue proxyToQueue = getProxyToQueue(getTestName());
        assertEquals(20, proxyToQueue.getMessageCount());
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        AmqpMessage pull = createReceiver.pull(5L, TimeUnit.SECONDS);
        assertNotNull(pull);
        pull.accept();
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        createReceiver.close();
        assertEquals(20 - 1, proxyToQueue.getMessageCount());
        assertEquals(1L, proxyToQueue.getMessagesAcknowledged());
        connect.close();
    }

    @Test(timeout = 60000)
    public void testMultipleZeroResultPulls() throws Exception {
        AmqpConnection connect = createAmqpClient().connect();
        AmqpReceiver createReceiver = connect.createSession().createReceiver(getTestName());
        createReceiver.flow(10);
        assertEquals(0L, getProxyToQueue(getTestName()).getMessageCount());
        assertEquals(10L, createReceiver.getReceiver().getRemoteCredit());
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertNull(createReceiver.pull(1L, TimeUnit.SECONDS));
        assertEquals(0L, createReceiver.getReceiver().getRemoteCredit());
        connect.close();
    }

    public void sendMessages(String str, int i) throws Exception {
        AmqpConnection amqpConnection = null;
        try {
            amqpConnection = createAmqpClient().connect();
            AmqpSender createSender = amqpConnection.createSession().createSender(str);
            for (int i2 = 0; i2 < i; i2++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setText("Test-Message-" + i2);
                createSender.send(amqpMessage);
            }
            createSender.close();
            if (amqpConnection != null) {
                amqpConnection.close();
            }
        } catch (Throwable th) {
            if (amqpConnection != null) {
                amqpConnection.close();
            }
            throw th;
        }
    }
}
